In [ ]:
# Author: Stephen Situ
# Apache Kafka is an event streaming platform used for building real time data pipelines and streaming applications. Using a Avien 
# Kafka service and the kafka-python library, we create a streaming data pipeline. Kafka uses the concepts of producers and consumers,
# with brokers acting as the middleman. The broker has topics, and within the topics themselves, there can be different partitions for the data.
# Using the key, we can send messages to different partitions and from the consumer side, the key will ensure they will receive messages
# from the right partition. In addition, it is also possible to use consumer groups to manage consumers. 
# Kafka connect can be used to integrate with existing data workflows without needing to modify code. 
In [9]:
# Create a kafka producer with the correct credentials 
# Serializers are set to use the json module to convery keys and values to json strings
# and encode them as ascii before sending to the Kafka topic. This is needed for key and values
# to be understood by a Kafka topic

import time
import json
from kafka import KafkaProducer


producer = KafkaProducer(
    bootstrap_servers=f"kafka-4da1624-wssitu-05f0.aivencloud.com:19068",
    security_protocol="SSL",
    ssl_cafile="ca.pem",
    ssl_certfile="service.cert",
    ssl_keyfile="service.key",
   value_serializer=lambda v: json.dumps(v).encode('ascii'),
    key_serializer=lambda v: json.dumps(v).encode('ascii')
)


topic_name="Pizza"
In [36]:
# We use the send method to send a message to a Kafka topic
# Flush method forces any buffered message to be send to the Kafka broker.
producer.send(
    topic_name,
    key={"id":1},
    value={"name":"John", "pizza":"Cheese"}
)
producer.flush()
In [37]:
producer.send(
    topic_name,
    key={"id":2},
    value={"name":"Steve", "pizza":"Pepperoni"}
)
producer.flush()
In [38]:
producer.send(
    topic_name,
    key={"id":3},
    value={"name":"Kate", "pizza":"Steak"}
)
producer.flush()
In [39]:
producer.send(
    topic_name,
    key={"id":4},
    value={"name":"Jason", "pizza":"Chicken"}
)
producer.flush()
In [40]:
producer.send(
    topic_name,
    key={"id":5},
    value={"name":"Kevin", "pizza":"Donaid"}
)
producer.flush()